{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark = SparkSession.builder.appName('nlp').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import Tokenizer, RegexTokenizer\n",
    "from pyspark.sql.functions import col, udf\n",
    "from pyspark.sql.types import IntegerType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+\n",
      "| id|            sentence|\n",
      "+---+--------------------+\n",
      "|  0|Hi I heard about ...|\n",
      "|  1|I wish java could...|\n",
      "|  2|Logistic,regressi...|\n",
      "+---+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sen_df = spark.createDataFrame([\n",
    "    (0, 'Hi I heard about Spark'),\n",
    "    (1, 'I wish java could use case classes'),\n",
    "    (2, 'Logistic,regression,model,are,neat')\n",
    "], ['id', 'sentence'])\n",
    "\n",
    "sen_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+\n",
      "| id|            sentence|               words|\n",
      "+---+--------------------+--------------------+\n",
      "|  0|Hi I heard about ...|[hi, i, heard, ab...|\n",
      "|  1|I wish java could...|[i, wish, java, c...|\n",
      "|  2|Logistic,regressi...|[logistic,regress...|\n",
      "+---+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenizer = Tokenizer(inputCol = 'sentence', outputCol = 'words')\n",
    "regex_tokenizer = RegexTokenizer(inputCol = 'sentence', outputCol = 'words', pattern = '\\\\W')\n",
    "\n",
    "count_tokens = udf(lambda words: len(words), IntegerType())\n",
    "tokenized = tokenizer.transform(sen_df)\n",
    "tokenized.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+------+\n",
      "| id|            sentence|               words|tokens|\n",
      "+---+--------------------+--------------------+------+\n",
      "|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|\n",
      "|  1|I wish java could...|[i, wish, java, c...|     7|\n",
      "|  2|Logistic,regressi...|[logistic,regress...|     1|\n",
      "+---+--------------------+--------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenized.withColumn('tokens', count_tokens(col('words'))).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+\n",
      "| id|            sentence|               words|\n",
      "+---+--------------------+--------------------+\n",
      "|  0|Hi I heard about ...|[hi, i, heard, ab...|\n",
      "|  1|I wish java could...|[i, wish, java, c...|\n",
      "|  2|Logistic,regressi...|[logistic, regres...|\n",
      "+---+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "regex_tokenized = regex_tokenizer.transform(sen_df)\n",
    "regex_tokenized.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+------+\n",
      "| id|            sentence|               words|tokens|\n",
      "+---+--------------------+--------------------+------+\n",
      "|  0|Hi I heard about ...|[hi, i, heard, ab...|     5|\n",
      "|  1|I wish java could...|[i, wish, java, c...|     7|\n",
      "|  2|Logistic,regressi...|[logistic, regres...|     5|\n",
      "+---+--------------------+--------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "regex_tokenized.withColumn('tokens', count_tokens(col('words'))).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+\n",
      "| id|              tokens|\n",
      "+---+--------------------+\n",
      "|  0|[I, saw, the, gre...|\n",
      "|  1|[Mary, had, a, li...|\n",
      "+---+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import StopWordsRemover\n",
    "\n",
    "sentenceDataFrame = spark.createDataFrame([\n",
    "    (0, ['I', 'saw', 'the', 'green', 'horse']),\n",
    "    (1, ['Mary', 'had', 'a', 'little', 'lamb'])\n",
    "], ['id', 'tokens'])\n",
    "\n",
    "sentenceDataFrame.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+\n",
      "| id|              tokens|            filtered|\n",
      "+---+--------------------+--------------------+\n",
      "|  0|[I, saw, the, gre...| [saw, green, horse]|\n",
      "|  1|[Mary, had, a, li...|[Mary, little, lamb]|\n",
      "+---+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "remover = StopWordsRemover(inputCol = 'tokens', outputCol = 'filtered')\n",
    "remover.transform(sentenceDataFrame).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+\n",
      "| id|               words|\n",
      "+---+--------------------+\n",
      "|  0|[Hi, I, heard, ab...|\n",
      "|  1|[I, wish, java, c...|\n",
      "|  2|[Logistic, regres...|\n",
      "+---+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import NGram\n",
    "\n",
    "wordDataFrame = spark.createDataFrame([\n",
    "    (0, ['Hi', 'I', 'heard', 'about', 'Spark']), \n",
    "    (1, ['I', 'wish', 'java', 'could', 'use', 'case', 'classes']),\n",
    "    (2, ['Logistic', 'regression', 'models', 'are', 'neat'])\n",
    "], ['id', 'words'])\n",
    "\n",
    "wordDataFrame.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+\n",
      "| id|               words|               grams|\n",
      "+---+--------------------+--------------------+\n",
      "|  0|[Hi, I, heard, ab...|[Hi I, I heard, h...|\n",
      "|  1|[I, wish, java, c...|[I wish, wish jav...|\n",
      "|  2|[Logistic, regres...|[Logistic regress...|\n",
      "+---+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "ngram = NGram(inputCol = 'words', outputCol = 'grams')\n",
    "ngram.transform(wordDataFrame).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------------------------------------+\n",
      "|grams                                                             |\n",
      "+------------------------------------------------------------------+\n",
      "|[Hi I, I heard, heard about, about Spark]                         |\n",
      "|[I wish, wish java, java could, could use, use case, case classes]|\n",
      "|[Logistic regression, regression models, models are, are neat]    |\n",
      "+------------------------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "ngram.transform(wordDataFrame).select('grams').show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+\n",
      "|label|            sentence|\n",
      "+-----+--------------------+\n",
      "|  0.0|Hi I heard about ...|\n",
      "|  0.0|I wish Java could...|\n",
      "|  1.0|Logistic regressi...|\n",
      "+-----+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import HashingTF, IDF, Tokenizer\n",
    "\n",
    "sentenceDataFrame = spark.createDataFrame([\n",
    "    (0.0, 'Hi I heard about Spark'), \n",
    "    (0.0, 'I wish Java could use case classes'),\n",
    "    (1.0, 'Logistic regression models are neat')\n",
    "], ['label', 'sentence'])\n",
    "\n",
    "sentenceDataFrame.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+--------------------+\n",
      "|label|            sentence|               words|\n",
      "+-----+--------------------+--------------------+\n",
      "|  0.0|Hi I heard about ...|[hi, i, heard, ab...|\n",
      "|  0.0|I wish Java could...|[i, wish, java, c...|\n",
      "|  1.0|Logistic regressi...|[logistic, regres...|\n",
      "+-----+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tokenizer = Tokenizer(inputCol = 'sentence', outputCol = 'words')\n",
    "word_df = tokenizer.transform(sentenceDataFrame)\n",
    "word_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+--------------------+--------------------+\n",
      "|label|            sentence|               words|         rawFeatures|\n",
      "+-----+--------------------+--------------------+--------------------+\n",
      "|  0.0|Hi I heard about ...|[hi, i, heard, ab...|(262144,[24417,49...|\n",
      "|  0.0|I wish Java could...|[i, wish, java, c...|(262144,[20719,24...|\n",
      "|  1.0|Logistic regressi...|[logistic, regres...|(262144,[13671,91...|\n",
      "+-----+--------------------+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "hashing_tf = HashingTF(inputCol = 'words', outputCol = 'rawFeatures')\n",
    "featurized_df = hashing_tf.transform(word_df)\n",
    "featurized_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+--------------------+--------------------+--------------------+\n",
      "|label|            sentence|               words|         rawFeatures|            features|\n",
      "+-----+--------------------+--------------------+--------------------+--------------------+\n",
      "|  0.0|Hi I heard about ...|[hi, i, heard, ab...|(262144,[24417,49...|(262144,[24417,49...|\n",
      "|  0.0|I wish Java could...|[i, wish, java, c...|(262144,[20719,24...|(262144,[20719,24...|\n",
      "|  1.0|Logistic regressi...|[logistic, regres...|(262144,[13671,91...|(262144,[13671,91...|\n",
      "+-----+--------------------+--------------------+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "idf = IDF(inputCol = 'rawFeatures', outputCol = 'features')\n",
    "idf_model = idf.fit(featurized_df)\n",
    "rescaled_df = idf_model.transform(featurized_df)\n",
    "rescaled_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---------------+\n",
      "| id|          words|\n",
      "+---+---------------+\n",
      "|  0|      [a, b, c]|\n",
      "|  1|[a, b, b, c, a]|\n",
      "+---+---------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import CountVectorizer\n",
    "\n",
    "df = spark.createDataFrame([\n",
    "    (0, 'a b c'.split(' ')),\n",
    "    (1, 'a b b c a'.split(' '))\n",
    "], ['id', 'words'])\n",
    "\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "cv = CountVectorizer(inputCol = 'words', outputCol = 'features', vocabSize = 3, minDF = 2.0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---------------+-------------------------+\n",
      "|id |words          |features                 |\n",
      "+---+---------------+-------------------------+\n",
      "|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|\n",
      "|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|\n",
      "+---+---------------+-------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cv_model = cv.fit(df)\n",
    "cv_df = cv_model.transform(df)\n",
    "cv_df.show(truncate=False)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
